package com.tang.module.canal.handle;

import cn.hutool.core.map.MapUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.InvalidProtocolBufferException;
import com.tang.common.annotation.canal.*;
import com.tang.common.constant.SafetyConstant;
import com.tang.common.resources.CanalDistribute;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

/**
 * 将canal接收到的信息分发到对应监听器
 * @author tang
 * @date 2022/1/4 21:35
 */
@Slf4j
@Component
public class CanalCoreDispenser implements CanalDistribute {

	/**
	 * canal缓存更新控制器
	 */
	private static final Map<String, Object> BEANS_WITH_ANNOTATION;


	static {
		BEANS_WITH_ANNOTATION = SpringUtil.getApplicationContext().getBeansWithAnnotation(CanalEventListener.class);
		log.info("canal缓存更新控制器总数: {}, bean列表如下:",BEANS_WITH_ANNOTATION.size());
		for (String s : BEANS_WITH_ANNOTATION.keySet()) {
			log.info(s);
		}
	}

	@Override
	public  void  distributionProcessing(List<CanalEntry.Entry> entryList) throws InvalidProtocolBufferException {
		for (CanalEntry.Entry entry : entryList) {
			//是对行数据进行处理
			if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
				//解析行数据，原始为字节数组，需要解析为实体对象
				CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
				//获得消息类型
				CanalEntry.EventType eventType = rowChange.getEventType();
				//过滤掉无用数据库和表
				if ( !screen(entry)) {
					return;
				}
				log.info("canal缓存更新,表名称 = {}",entry.getHeader().getTableName());
				//删除
				if (eventType == CanalEntry.EventType.DELETE) {
					if (MapUtil.isNotEmpty(BEANS_WITH_ANNOTATION)){
						deleteDistribution(entry);
					}
					//更新
				} else if (eventType == CanalEntry.EventType.UPDATE) {
					if (MapUtil.isNotEmpty(BEANS_WITH_ANNOTATION)){
						updateDistribution(entry);
					}
					//插入
				} else if (eventType == CanalEntry.EventType.INSERT) {
					if (MapUtil.isNotEmpty(BEANS_WITH_ANNOTATION)){
						insertDistribution(entry);
					}
					//其他DDL
				}else {
					if (MapUtil.isNotEmpty(BEANS_WITH_ANNOTATION)){
						otherDistribution(entry);
					}
				}
			}
		}
	}

	@Override
	public void updateDistribution(CanalEntry.Entry entry){
		for (Object value : BEANS_WITH_ANNOTATION.values()) {
			Method[] methods = value.getClass().getMethods();
			for (Method method : methods) {
				CanalUpdatePoint updatePoint = AnnotatedElementUtils.findMergedAnnotation(method, CanalUpdatePoint.class);
				if (updatePoint != null && matchChain(entry,updatePoint.tableName(), updatePoint.database())){
					try {
						method.invoke(value,entry);
					} catch (IllegalAccessException | InvocationTargetException e) {
						e.printStackTrace();
						log.error("执行{}方法异常，canal更新操作失败",method);
					}
				}
			}
		}
	}

	@Override
	public void insertDistribution(CanalEntry.Entry entry){
		for (Object value : BEANS_WITH_ANNOTATION.values()) {
			Method[] methods = value.getClass().getMethods();
			for (Method method : methods) {
				CanalInsertPoint inserting = AnnotatedElementUtils.findMergedAnnotation(method, CanalInsertPoint.class);
				if (inserting != null && matchChain(entry,inserting.tableName(), inserting.database())){
					try {
						method.invoke(value,entry);
					} catch (IllegalAccessException | InvocationTargetException e) {
						e.printStackTrace();
						log.error("执行{}方法异常，canal插入操作失败",method);
					}
				}
			}
		}
	}

	@Override
	public void deleteDistribution(CanalEntry.Entry entry){
		for (Object value : BEANS_WITH_ANNOTATION.values()) {
			Method[] methods = value.getClass().getMethods();
			for (Method method : methods) {
				CanalDeletePoint deletePoint = AnnotatedElementUtils.findMergedAnnotation(method, CanalDeletePoint.class);
				if (deletePoint != null && matchChain(entry,deletePoint.tableName(), deletePoint.database())){
					try {
						method.invoke(value,entry);
					} catch (IllegalAccessException | InvocationTargetException e) {
						e.printStackTrace();
						log.error("执行{}方法异常，canal删除操作失败",method);
					}
				}
			}
		}
	}

	@Override
	public void otherDistribution(CanalEntry.Entry entry){
		for (Object value : BEANS_WITH_ANNOTATION.values()) {
			Method[] methods = value.getClass().getMethods();
			for (Method method : methods) {
				CanalDDLPoint ddlPoint = AnnotatedElementUtils.findMergedAnnotation(method, CanalDDLPoint.class);
				if (ddlPoint != null){
					try {
						method.invoke(value,entry);
					} catch (IllegalAccessException | InvocationTargetException e) {
						e.printStackTrace();
						log.error("执行{}方法异常，canalDDL操作失败",method);
					}
				}
			}
		}
	}

	@Override
	public Boolean matchChain(CanalEntry.Entry entry, String tableName,String databaseName){
		//表名是否一致
		Predicate<CanalEntry.Entry > predicate = e ->  e.getHeader().getTableName().equals(tableName)
			|| tableName.equals(SafetyConstant.ALL);

		//数据库名是否一致
		Predicate<CanalEntry.Entry > predicate1 = e ->  e.getHeader().getSchemaName().equals(databaseName)
			|| databaseName.equals(SafetyConstant.ALL);

		return predicate.and(predicate1).test(entry);
	}

	@Override
	public Boolean screen(CanalEntry.Entry entry) {
		//这里排除除tao之外的其他数据库,可以按照自己的要求设置此项目
		if (! "tao".equals(entry.getHeader().getSchemaName())){
			return false;
		}
		//这里排除全部包含log字样的表，因为这些都是记录日志的表，无需同步
		if (entry.getHeader().getTableName().contains("log")) {
			return false;
		}
		return Boolean.TRUE;
	}
}
